package com.sisyphus.source

import java.lang
import java.util.Properties

import com.sisyphus.utils.ParamsConf
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord

/**
 * 输出到kafka
 */
class KafkaSink {
  def flinkKafkaProducer(): FlinkKafkaProducer[(String, String, Long)] = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", ParamsConf.kafkaHosts)
    properties.setProperty("group.id", ParamsConf.kafkaGroupId)

    val kakfaSink = new FlinkKafkaProducer[(String, String, Long)](
      ParamsConf.kafkaTopic,
      new ResultSerialization,
      properties,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE
    )
    kakfaSink
  }

  class ResultSerialization extends KafkaSerializationSchema[(String, String, Long)] {
    override def serialize(t: (String, String, Long), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
      new ProducerRecord[Array[Byte], Array[Byte]](ParamsConf.kafkaTopic, t.toString().getBytes())
    }
  }

}


